package com.primeton.poctag.task;

import com.alibaba.fastjson.JSONObject;
import com.primeton.poctag.configure.TaskProperties;
import com.primeton.poctag.service.warn.WarnService;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;


/**
 * <p>
 * </p>
 * Created by zhaopx on 2018/1/3 0003-18:06
 */
public class ShellNativeLocalTask extends SparkTask {


    private static Logger LOG = LoggerFactory.getLogger(ShellNativeLocalTask.class);

    /**
     * 任务的JobId
     */
    protected final String jobId;

    /**
     * 前端传输的taskId
     */
    protected final String originTaskId;

    /**
     * 入口类
     */
    protected final String flowXmlFile;

    /**
     * 传递给 Task 的参数
     */
    protected final Map<String, String> taskParams;

    /**
     * 告警服务
     */
    WarnService warnService;

    /**
     * 任务参数配置
     */
    TaskProperties taskProperties;

    /**
     * 不关心 originTaskId，采用 UUID 生成
     * @param jobId
     * @param taskParams 提供给App的入口参数
     */
    public ShellNativeLocalTask(String jobId,
                                String flowXmlFile,
                                Map<String, String> taskParams) {
        this(jobId, flowXmlFile, UUID.randomUUID().toString().replaceAll("-", ""), taskParams);
    }


    /**
     * 传入 dataFile 地址， file or hdfs
     * @param originTaskId 这里的 originID 是指 taskInstanceId。 本质上Spark运行的ID和前面指定的ID是无法直接使用的，这里理解为做个关联
     * @param taskParams 提供给App的入口参数
     */
    public ShellNativeLocalTask(String jobId,
                                String flowXmlFile,
                                String originTaskId,
                                Map<String, String> taskParams) {
        super(StringUtils.isBlank(originTaskId) ? UUID.randomUUID().toString().replaceAll("-", "") : originTaskId);
        this.jobId = jobId;
        this.originTaskId = getTaskId();
        this.flowXmlFile = flowXmlFile;
        this.taskParams = Optional.ofNullable(taskParams).orElse(new HashMap<>());
    }



    @Override
    public Map<String, Object> doExecute() throws Exception {
        JSONObject params = new JSONObject();
        params.put("id", jobId);
        params.put("originTaskId", originTaskId);
        params.put("queryDsl", taskParams.get("queryDsl"));
        params.put("location", "ug_export_" + originTaskId);
        execute(params);
        return params;
    }

    private void execute(Map<String, Object> params) throws Exception {
        JSONObject map = (JSONObject)params;
        final String jobId = map.getString("id");
        // 前端的taskId，因为在任务中需要
        final String originTaskId = map.getString("originTaskId");

        // quartz job name, 跟 jobId 有关系
        LOG.info("Start SparkSQLJob {} at: {}", jobId, DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));

        // TASK UUID
        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
        String[] args = this.parseArgs(jobId, originTaskId, map);

        try {
            JSONObject object = new JSONObject();
            object.put("finalStatus", "UNDEFINED");
            object.put("appState", "NEW");
            object.put("source", "data3c");
            object.put("jobType", "SPARK");
            object.put("taskName", map.getString("jobName"));
            object.put("uuid", uuid);
            object.put("__ORIGIN_TASK_ID", originTaskId);

            // 把正在运行的任务放置到队列
            JobQueue.pushNewTask(jobId, originTaskId, uuid, map);
            int exitCode = runAsRunTimeProcess(jobId,
                    originTaskId,
                    map,
                    args);
            if(exitCode != 0) {
                // 执行异常
                throw new IllegalStateException("Spark Task Invalid exitCode: " + exitCode);
            }
            JSONObject result = new JSONObject();
            result.put("source", "data3c");
            result.put("jobType", "SPARK");
            result.put("exitCode", exitCode);
            object.put("appState", "SUCCEEDED");
            result.put("finalStatus", "FINISHED");
            result.put("taskName", map.getString("jobName"));
            result.put("process", 1);
            result.put("uuid", uuid);
            result.put("__ORIGIN_TASK_ID", originTaskId);
            LOG.info("{}/{} 运行结束。 STATE: {}", jobId, originTaskId, result.toJSONString());
        } catch (Exception e) {
            JSONObject error = new JSONObject();
            error.put("source", "data3c");
            error.put("jobType", "SPARK");
            error.put("appState", "FAILED");
            error.put("finalStatus", "FAILED");
            error.put("taskName", map.getString("jobName"));
            error.put("message", e.getMessage());
            error.put("uuid", uuid);
            error.put("__ORIGIN_TASK_ID", originTaskId);
            //error.put("error", ExceptionUtils.getFullStackTrace(e));
            JobQueue.removeErrorJob(jobId, originTaskId);
            LOG.error("run job error: ", e);
        }
    }


    /**
     * 调用系统命令，启动Job
     *
     * @param args
     * @return 0 正常运行结束，返回0； 不正常的结束，返回小于 0
     */
    private int runAsRunTimeProcess(final String jobId,
                                    final String taskId,
                                    final JSONObject taskConfig,
                                    String[] args){
        String execute = "sparksql-flow-runner.sh";

        // 如果配置了SaprkHome，则从Spark Home 下读取spark-submit
        if(System.getenv("SPARKSQL_FLOW_HOME") != null){
            //execute = System.getenv("SPARK_HOME") + "bin" + execute;
            String sparkHome = System.getenv("SPARKSQL_FLOW_HOME");
            execute = StringUtils.join(Arrays.asList(sparkHome, "bin", execute).iterator(), File.separator);
        } else if(System.getProperty("sparksql_flow.home") != null) {
            String sparkHome = System.getProperty("sparksql_flow.home");
            execute = StringUtils.join(Arrays.asList(sparkHome, "bin", execute).iterator(), File.separator);
        }
        String cmd = execute + " " + StringUtils.join(args, " ");
        LOG.info("execute command: " + cmd);
        try (SparkJobOutputStream out = new SparkJobOutputStream(taskProperties.getTaskLogDir(), jobId, taskId)){
            PumpStreamHandler streamHandler = new PumpStreamHandler(out);

            // 不限时间
            final ExecuteWatchdog watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
            DefaultExecutor executor = new DefaultExecutor();
            executor.setExitValue(0);

            PidProcessDestroyer pidProcessDestroyer = new PidProcessDestroyer(taskId);
            executor.setStreamHandler(streamHandler);
            executor.setWatchdog(watchDog);
            executor.setProcessDestroyer(pidProcessDestroyer);
            JobQueue.addTaskObserver(jobId, taskId, watchDog);

            Map<String, String> environment = new HashMap<>(System.getenv());
            environment.put("SPARK_SUBMIT_OPTS", taskProperties.getSparkSubmitOpts());
            executor.setWorkingDirectory(new File(out.getTaskDir()));

            // 是否开启任务
            JobExecuteResultHandler handler = new JobExecuteResultHandler(
                    warnService);
            handler.setTaskName(taskConfig.getString("jobName"));
            environment.put("TASK_DIR", out.getTaskDir());
            executor.execute(CommandLine.parse(cmd), environment, handler);
            handler.waitFor();
            return handler.getExitValue();
        } catch (Exception e) {
            LOG.error(jobId + "/" + taskId + " 执行任务提交命令失败。");
            throw new IllegalStateException(e.getMessage());
        } finally {
            JobQueue.removeTaskObserver(jobId, taskId);
        }
    }



    /**
     * 解析 Job 参数
     * @param jobId 任务ID
     * @param taskId 随机唯一ID
     * @param taskConfig task 配置
     * @return
     */
    protected String[] parseArgs(final String jobId, final String taskId, final JSONObject taskConfig) {
        List<String> list = new ArrayList<>();
        list.add("-f");
        list.add(new File(flowXmlFile).getAbsolutePath());
        for (Map.Entry<String, Object> entry : taskConfig.entrySet()) {
            if(entry.getValue() == null) {
                continue;
            }
            list.add("-p");
            // 第1个 参数
            list.add("'" + entry.getKey() + "=" + entry.getValue() + "'");
        }

        LOG.info("args: " + StringUtils.join(list.iterator(), " "));
        return list.toArray(new String[list.size()]);
    }

    public void setWarnService(WarnService warnService) {
        this.warnService = warnService;
    }

    public void setTaskProperties(TaskProperties taskProperties) {
        this.taskProperties = taskProperties;
    }


}
